package com.atguigu.utils;

import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.writer.SimpleStringSerializer;

import java.util.Properties;

public class DorisUtil {

    public static DorisSink<String> getDorisSink(String tableName) {

//        Properties props = new Properties();
//        props.setProperty("format", "json");
//        props.setProperty("read_json_by_line", "true"); // 每行一条 json 数据
//
//        return DorisSink.<String>builder()
//                .setDorisReadOptions(DorisReadOptions.builder().build())
//                .setDorisOptions(DorisOptions.builder()
//                        .setFenodes("hadoop102:7030")
//                        .setTableIdentifier("gmall_230315." + tableName)
//                        .setUsername("root")
//                        .setPassword("000000")
//                        .build())
//                .setDorisExecutionOptions(DorisExecutionOptions.builder()
//                        .disable2PC()
//                        .setMaxRetries(2)
//                        .setCheckInterval(1)
//                        .setBufferSize(1024 * 1024)
//                        .setBufferCount(5)
//                        .setDeletable(false)
//                        .setStreamLoadProp(props) //默认为TSV
//                        .build())
//                .setSerializer(new SimpleStringSerializer())
//                .build();

        Properties props = new Properties();
        props.setProperty("format", "json");
        props.setProperty("read_json_by_line", "true"); // 每行一条 json 数据
        props.setProperty("two_phase_commit", "false");

        return DorisSink.<String>builder()
                .setDorisReadOptions(DorisReadOptions.builder().build())
                .setDorisOptions(DorisOptions.builder() // 设置 doris 的连接参数
                        .setFenodes("hadoop102:7030")
                        .setTableIdentifier("gmall_230315." + tableName)
                        .setUsername("root")
                        .setPassword("000000")
                        .build())
                .setDorisExecutionOptions(DorisExecutionOptions.builder() // 执行参数
                        //.setLabelPrefix("doris-label")  // stream-load 导入的时候的 label 前缀
                        .setStreamLoadProp(props) // 设置 stream load 的数据格式 默认是 csv,需要改成 json
                        .disable2PC() // 开启两阶段提交后,labelPrefix 需要全局唯一,为了测试方便禁用两阶段提交
                        .setDeletable(false)
                        .setBufferCount(3) // 批次条数: 默认 3
                        .setBufferSize(8 * 1024) // 批次大小: 默认 1M
                        .setCheckInterval(3000) // 批次输出间隔   三个对批次的限制是或的关系
                        .setMaxRetries(3)
                        .build())
                .setSerializer(new SimpleStringSerializer())
                .build();

    }

}
